跳到主要内容

Redis 事务的基本使用

什么是 Redis 的事务

Redis 的基本事务(basic transaction)需要用到 MULTI 命令和 EXEC 命令,这种事务可以让一个客户端在不被其他客户端打断的情况下执行多个命令。

和关系数据库那种可以在执行的过程中进行回滚(rollback)的事务不同,在 Redis 里面,被 MULTI 命令和 EXEC 命令包围的所有命令会一个接一个地执行,直到所有命令都执行完毕为止,即一个事务完成。当一个事务执行完毕之后,Redis 才会处理其他客户端的命令。

要在 Redis 里面执行事务,我们首先需要执行 MULTI 命令,然后输入那些我们想要在事务里面执行的命令,最后再执行 EXEC 命令。当 Redis 从一个客户端那里接收到 MULTI 命令时,Redis 会将这个客户端之后发送的所有命令都放人到一个队列里面,直到这个客户端发送 EXEC 命令为止,然后 Redis 就会在不被打断的情况下,一个接一个地执行存储在队列里面的命令。

Redis 事务命令

命令描述
DISCARD取消事务,放弃执行事务块内的所有命令。
EXEC执行所有事务块内的命令。
MULTI标记一个事务块的开始。
UNWATCH取消 WATCH 命令对所有 key 的监视。
WATCH key监视一个(或多个) key ,如果在事务执行之前这个 key 被其他命令所改动,那么事务将被打断。

watch 监控事务

在 Redis 中使用 watch 命令可以决定事务是执行还是回滚。一般而言,可以在 multi 命令之前使用 watch 命令监控某些键值对,然后使用 multi 命令开启事务,执行各类对数据结构进行操作的命令,这个时候这些命令就会进入队列。

当 Redis 使用 exec 命令执行事务的时候,它首先会去比对被 watch 命令所监控的键值对,如果没有发生变化,那么它会执行事务队列中的命令,提交事务;

如果发生变化,那么它不会执行任何事务中的命令,而去事务回滚。

无论事务是否回滚,Redis 都会去取消执行事务前的 watch 命令,这个过程如图 1 所示。

Redis 参考了多线程中 使用的 CAS(比较与交换,Compare And Swap)去执行的。在数据高并发环境的操作中,我们把这样的一个机制称为乐观锁。

为什么需要事务?

先来看下不使用事务会出现的问题(这里直接使用书上的代码):

注意:这里的问题不是 Redis 的值会和最终结果不一致,而是在两个线程加减操作之间可能其它线程执行了这个加操作

如下:

时间  值的大小  操作线程
1. v: 0
2. v: 1 线程 1 ++
3. v: 2 线程 2 ++
4. v: 1 线程 1 --
5. v: 2 线程 3 ++
6. v: 1 线程 2 --
7. v: 0 线程 3 --

因为没有使用事务,所以 3 个线程都可以在执行自减操作之前,对 notrans: 计数器执行自增操作,虽然最终的结果都是 0,但是如果某个线程在结果非零的情况下读取了值,可能就会读取到错误的值

虽然代码清单里面通过休眠 100 毫秒的方式来放大了潜在的问题,但如果我们确实需要在不受其他命令干扰的情况下,对计数器执行自增操作和自减操作,那么我们就不得不解决这个潜在的问题。

Redis 事务可以一次执行多个命令(注意 Redis 的每条命令都是原子操作), 并且带有以下三个重要的保证:

  • 批量操作在发送 EXEC 命令前被放入队列缓存。
  • 收到 EXEC 命令后进入事务执行,事务中任意命令执行失败,其余的命令依然被执行。
  • 在事务执行过程,其他客户端提交的命令请求不会插入到事务执行命令序列中。

一个事务从开始到执行会经历以下三个阶段:

  • 开始事务。
  • 命令入队。
  • 执行事务。

为什么 Redis 没有加锁?

结论:因为它使用了乐观锁

为什么 Redis 没有实现典型的加锁功能?在访问以写入为目的数据的时候(SQL 中的 SELECT FOR UPDATE),关系数据库会对被访问的数据行进行加锁,直到事务被提交(COMMIT)或者被回滚(ROLLBACK)为止。

如果有其他客户端试图对被加锁的数据行进行写入,那么该客户端将被阻塞,直到第一个事务执行完毕为止。加锁在实际使用中非常有效,基本上所有关系数据库都实现了这种加锁功能,它的缺点在于,持有锁的客户端运行越慢,等待解锁的客户端被阻塞的时间就越长。

因为加锁有可能会造成长时间的等待,所以 Redis 为了尽可能地减少客户端的等待时间,并不会在执行 WATCH 命令时对数据进行加锁。相反地,Redis 只会在数据已经被其他客户端抢先修改了的情况下,通知执行了 WATCH 命令的客户端,这种做法被称为 乐观锁(optimistic locking),而关系数据库实际执行的加锁操作则被称为悲观锁(pessimistic locking)。 乐观锁在实际使用中同样非常有效,因为客户端永远不必花时间去等待第一个取得锁的客户端,它们只需要在自己的事务执行失败时进行重试就可以了。

没有使用事务的错误代码

public class Test {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(3);
Runnable run = () -> {
Jedis conn = new Jedis("10.1.1.161", 6379);
conn.select(2);

// 每次执行前先打印一下值
System.out.println("当前线程为:" + Thread.currentThread().getName() + "当前的值为:" + conn.get("notrans:"));
conn.incr("notrans:");
try {
TimeUnit.MILLISECONDS.sleep(1); //

System.out.println("当前线程为:" + Thread.currentThread().getName() + "当前的值为:" + conn.get("notrans:"));

conn.incrBy("notrans:", -1);

System.out.println("当前线程为:" + Thread.currentThread().getName() + "当前的值为:" + conn.get("notrans:"));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
countDownLatch.countDown();
}
};

for (int i = 0; i < 3; i++) {
new Thread(run).start();
}

countDownLatch.await();
}
}

输出:

当前线程为:Thread-1当前的值为:0
当前线程为:Thread-0当前的值为:0
当前线程为:Thread-2当前的值为:0
当前线程为:Thread-1当前的值为:3
当前线程为:Thread-0当前的值为:3
当前线程为:Thread-2当前的值为:3
当前线程为:Thread-1当前的值为:0
当前线程为:Thread-0当前的值为:0
当前线程为:Thread-2当前的值为:0

可以看到虽然最终的结果都是 0 ,但是在中间的时候值为 3

使用了事务后

继续借用书上的代码:

在 Java 中使用

public class Test {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
new Thread(() -> {
Jedis conn = new Jedis("10.1.1.161", 6379);
conn.select(2);
// 创建一个事务型流水线对象
Pipeline pipelined = conn.pipelined();
// 每次执行前先打印一下值,
// 注意,管道中不能使用 conn 操作 Redis,读也不行,而它的值得管道全部执行完才能拿到,所以需要把这个读结果的操作放在 sync 之后
Response<String> res1 = pipelined.get("notrans:");

pipelined.incr("notrans:");
try {
TimeUnit.MILLISECONDS.sleep(1); //
Response<String> res2 = pipelined.get("notrans:");
pipelined.incrBy("notrans:", -1);
Response<String> res3 = pipelined.get("notrans:");
pipelined.sync(); // 执行 EXEC

System.out.println("当前线程为:" + Thread.currentThread().getName() + "当前的值为:" + res1.get());
System.out.println("当前线程为:" + Thread.currentThread().getName() + "当前的值为:" + res2.get());
System.out.println("当前线程为:" + Thread.currentThread().getName() + "当前的值为:" + res3.get());
} catch (InterruptedException e) {

Thread.currentThread().interrupt();
} finally {
countDownLatch.countDown();
}
}).start();
}
countDownLatch.await();
}
}

输出的结果

当前线程为:Thread-2当前的值为:0
当前线程为:Thread-2当前的值为:1
当前线程为:Thread-2当前的值为:0
当前线程为:Thread-0当前的值为:0
当前线程为:Thread-0当前的值为:1
当前线程为:Thread-0当前的值为:0
当前线程为:Thread-1当前的值为:0
当前线程为:Thread-1当前的值为:1
当前线程为:Thread-1当前的值为:0

可以发现这个输出结果就正常了

补充:管道的使用(事务)

直接看原文 Java Redis Pipeline 使用示例 更详细,这里只是简单介绍一下 Pipeline 的使用方式和注意点

public class HowToTest {

public static void main(String[] args) {
// 连接池
JedisPool jedisPool = new JedisPool("192.168.1.9", 6379);

/* 操作Redis */
Jedis jedis = null;
try {
jedis = jedisPool.getResource();

TimeLag t = new TimeLag();

System.out.println("操作前,全部Key值:" + jedis.keys("*"));
Pipeline p = jedis.pipelined();
/* 插入多条数据 */
for(Integer i = 0; i < 100000; i++) {
p.set(i.toString(), i.toString());
}

/* 删除多条数据 */
for(Integer i = 0; i < 100000; i++) {
p.del(i.toString());
}
p.sync();
System.out.println("操作前,全部Key值:" + jedis.keys("*"));

System.out.println(t.cost());
} finally {
if (jedis != null) {
jedis.close();
}
}
}

}

管道的速度~

为什么 Pipelining 这么快?

先看看原来的多条命令,是如何执行的:

sequenceDiagram
Redis Client->>Redis Server: 发送第1个命令
Redis Server->>Redis Client: 响应第1个命令
Redis Client->>Redis Server: 发送第2个命令
Redis Server->>Redis Client: 响应第2个命令
Redis Client->>Redis Server: 发送第n个命令
Redis Server->>Redis Client: 响应第n个命令

Pipeling 机制是怎样的呢:

sequenceDiagram
Redis Client->>Redis Server: 发送第1个命令(缓存在Redis Client,未即时发送)
Redis Client->>Redis Server: 发送第2个命令(缓存在Redis Client,未即时发送)
Redis Client->>Redis Server: 发送第n个命令(缓存在Redis Client,未即时发送)
Redis Client->>Redis Server: 发送累积的命令
Redis Server->>Redis Client: 响应第1、2、n个命令

节约了响应的时间,所以非常快

管道的局限性

基于其特性,它有两个明显的局限性:

鉴于 Pipepining 发送命令的特性,Redis 服务器是以队列来存储准备执行的命令,而队列是存放在有限的内存中的,所以不宜一次性发送过多的命令。如果需要大量的命令,可分批进行,效率不会相差太远滴,总好过内存溢出嘛~~

由于 pipeline 的原理是收集需执行的命令,到最后才一次性执行。所以无法在中途立即查得数据的结果(需待 pipelining 完毕后才能查得结果),这样会使得无法立即查得数据进行条件判断(比如判断是非继续插入记录)。

response.get()p.sync(); 完毕前无法执行,否则,会报异常

非事务型流水线

被 MULTI 和 EXEC 包裹的命令在执行时不会被其他客户端打扰。而使用事务的其中一个好处就是底层的客户端会通过使用流水线来提高事务执行时的性能。

但是 MULTI 和 EXEC 也会消耗资源,并且可能会导致其他重要的命令被延迟执行。

那如何在不使用事务的情况下(可以让用户同时执行多个不同的命令),通过使用流水线来进一步提升命令的执行性能呢?

pipe = conn.pipeline()

在执行 pipeline() 时传入 True 作为参数,或者不传入任何参数,那么客户端将使用 MULTI 和 EXEC 包裹起用户要执行的所有命令。如果传入 False 为参数,那么客户端同样会像执行事务那样收集用户要执行的所有命令,只是不再使用 MULTI 和 EXEC 包裹这些命令。

如果用户需要向 Redis 发送多个命令,并且对于这些命令来说,一个命令的执行结果并不会影响另一个命令的输入,而且这些命令也不需要以事务的方式来执行。

下面是没有使用非事务型流水线代码:

public void updateToken(Jedis conn, String token, String user, String item) {
long timestamp = System.currentTimeMillis() / 1000;
conn.hset("login:", token, user);
conn.zadd("recent:", timestamp, token);
if (item != null) {
conn.zadd("viewed:" + token, timestamp, item);
conn.zremrangeByRank("viewed:" + token, 0, -26);
conn.zincrby("viewed:", -1, item);
}
}

使用非实物型流水线后:

public void updateTokenPipeline(Jedis conn, String token, String user, String item) {
long timestamp = System.currentTimeMillis() / 1000;
Pipeline pipe = conn.pipelined();
// pipe.multi();
pipe.hset("login:", token, user);
pipe.zadd("recent:", timestamp, token);
if (item != null){
pipe.zadd("viewed:" + token, timestamp, item);
pipe.zremrangeByRank("viewed:" + token, 0, -26);
pipe.zincrby("viewed:", -1, item);
}
// pipe.exec();
}